package com.lyon.demo.protocol.netty.client;

import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.text.StrFormatter;
import com.lyon.demo.protocol.api.Protocols;
import com.lyon.demo.protocol.api.core.Command;
import com.lyon.demo.protocol.api.core.RemotingCallback;
import com.lyon.demo.protocol.api.core.command.ResponseFuture;
import com.lyon.demo.protocol.api.remoting.RemotingRequestHandler;
import com.lyon.demo.protocol.api.transport.Transport;
import com.lyon.demo.protocol.api.transport.TransportClient;
import com.lyon.demo.protocol.netty.config.NettyRemoteClientConfig;
import com.lyon.demo.protocol.netty.core.AbstractNettyRemotingService;
import com.lyon.demo.storage.common.spi.annotation.SpiActivate;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @author LeeYan9
 * @since 2022-05-24
 */
@SuppressWarnings({"rawtypes", "noinspection", "FieldCanBeLocal", "unused", "unchecked"})
@SpiActivate(Protocols.NETTY)
@Slf4j
public class NettyRemoteClient extends AbstractNettyRemotingService<Command> implements TransportClient<ChannelHandlerContext, Command, Command, NettyRemoteClientConfig> {

    private Bootstrap bootstrap;
    private NettyRemoteClientConfig nettyClientConfig;
    private RemotingRequestHandler remotingRequestHandler;
    private Properties properties;
    private SocketAddress localAddress;
    private final Map<SocketAddress, Channel> channelTable = new ConcurrentHashMap<>();

    @Override
    public void prepare(Properties properties) {
        this.properties = properties;
        if (this.nettyClientConfig == null ) {
            this.nettyClientConfig = properties == null ? null : (NettyRemoteClientConfig) properties.get("netty-client-config");
        }
        if (nettyClientConfig == null) {
            this.nettyClientConfig = NettyRemoteClientConfig.ofDefault();
        }
        if (nettyClientConfig.getListenPort() > 0) {
            this.localAddress = new InetSocketAddress(nettyClientConfig.getListenPort());
        }
        prepareSharableInstances();
    }

    private void prepareSharableInstances() {
        this.remotingRequestHandler = new NettyClientHandler();
    }

    @SneakyThrows
    @Override
    public void start() {
        this.bootstrap = new Bootstrap();
        NioEventLoopGroup workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());

        bootstrap.group(workGroup)
                .channel(NioSocketChannel.class)
//                .option(ChannelOption.SO_BACKLOG, 1024)
//                .option(ChannelOption.SO_REUSEADDR, true)
//                .option(ChannelOption.SO_KEEPALIVE, false)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        //noinspection rawtypes
                        socketChannel
                                .pipeline()
                                .addLast(new ObjectEncoder())
                                .addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())))
                                .addLast(new IdleStateHandler(0, 0, nettyClientConfig.getServerIdlTimeSeconds()))
                                .addLast((SimpleChannelInboundHandler) remotingRequestHandler);
                    }
                });
    }

    @Override
    public void setConfig(NettyRemoteClientConfig nettyClientConfig) {
        this.nettyClientConfig = nettyClientConfig;
    }

    @Override
    public void shutdown() {
        channelTable.values().forEach(Channel::disconnect);
        // what more needs to be done
        this.properties = null;
    }

    @Override
    public Transport createIfAbsent(SocketAddress socketAddress, Duration connectTimeout) {
        Channel channel = createChannel(socketAddress, connectTimeout);
        return new NettyTransport(channel);
    }

    @SneakyThrows
    private Channel createChannel(SocketAddress socketAddress, Duration connectTimeout) {
        Channel channel = channelTable.get(socketAddress);
        if (isAvailableChannel(channel)) {
            return channel;
        }
        ChannelFuture channelFuture = bootstrap.connect(socketAddress);
        channelFuture.addListener(future -> {
            if (future.isSuccess()) {
                log.info("channel[{}] connected..", socketAddress);
            } else {
                log.error(StrFormatter.format("channel[{}] connected failure..", socketAddress), future.cause());
            }
        });
        boolean await = channelFuture.await(connectTimeout.toNanos(), TimeUnit.NANOSECONDS);
        if (!await) {
            throw new ConnectTimeoutException(StrFormatter.format("连接超时 {}", socketAddress));
        }
        channel = channelFuture.channel();
        if (!isAvailableChannel(channel)) {
            throw new ConnectException(StrFormatter.format("连接不可用 {}", socketAddress));
        }
        channelTable.put(socketAddress, channel);
        return channel;
    }

    private boolean isAvailableChannel(Channel channel) {
        if (Objects.isNull(channel)) {
            return false;
        }
        return channel.isOpen() && channel.isActive();
    }


    @AllArgsConstructor
    public class NettyTransport<T extends Command, R extends Command> implements Transport<T, R> {

        private final Channel channel;

        @Override
        public ResponseFuture<R> send(T command, Duration timeout) {
            return doSend(command, timeout, null);
        }

        @Override
        public void asyncSend(T command, Duration timeout, RemotingCallback<ResponseFuture<R>> callback) {
            doSend(command, timeout, callback);
        }

        @Override
        public void asyncSend(T command, RemotingCallback<ResponseFuture<R>> callback) {
            asyncSend(command, null, callback);
        }

        private ResponseFuture<R> doSend(Command command, Duration timeout, RemotingCallback<ResponseFuture<R>> callback) {
            long opaque = command.getRequestId();
            ResponseFuture<R> responseFuture = new ResponseFuture<>(opaque);
            responseFuture.setCallback(callback);
            if (timeout != null) {
                responseFuture.setTimeoutMillis(timeout.toMillis());
            }
            ResponseFuture old = responseTable.putIfAbsent(opaque, responseFuture);
            if (old!=null) {
                log.error("请求id[{}]-已存在其他的请求信息", opaque);
            }
//            responseTable.compute(command.getRequestId(), (requestId, oldFuture) -> {
//                if (oldFuture != null) {
//                    log.error("请求id[{}]-已存在其他的请求信息，被覆盖 {}", requestId, JSONUtil.toJsonStr(responseFuture));
//                }
//                return responseFuture;
//            });
            try {
                channel.writeAndFlush(command)
                        .addListener(future -> {
                            if (future.isSuccess()) {
//               todo                 log.info("请求id[{}]-发送请求成功", opaque);
                                responseFuture.setSendOk(true);
                            } else {
                                log.warn(CharSequenceUtil.format("请求id[{}]-发送请求失败", opaque), future.cause());
                                responseFuture.setSendOk(false);
                                responseFuture.completeExceptionally(future.cause());
                            }
                        });
            } catch (Exception e) {
                log.error(CharSequenceUtil.format("请求id[{}]-发送请求失败", opaque), e);
                responseFuture.setSendOk(false);
                responseFuture.completeExceptionally(e);
            }
            return responseFuture;
        }

    }

    @ChannelHandler.Sharable
    class NettyClientHandler extends SimpleChannelInboundHandler<Command> implements RemotingRequestHandler<ChannelHandlerContext, Command> {

        @Override
        public void processRequest(ChannelHandlerContext ctx, Command remoteCommand) {
            NettyRemoteClient.this.processRequestCommand(ctx, remoteCommand);
        }

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Command remoteCommand) {
            processRequest(ctx, remoteCommand);
        }
    }

}
